\section{Query Answering Subsystem}
\label{sec:queryAnsweringSubsystem}
\index{Subsystem!Query Answering}

\subsection{Principal Components}

This subsystem handles the interaction between producer and consumer
services required to answer queries. It covers the entire streaming
process that starts (for a query) just after a producer receives a
start call from a consumer and ends either when all tuples have been
sent, or the query is aborted. It also covers timing out of queries
and streaming connections. 
\subsection{Streaming protocol}

Every new streaming connection that is opened must begin with a 4 byte header containing an
integer identifying the streaming protocol being used. This is the negative of a
version number of the streaming protocol.

The header must be sent at the start of every new streaming connection but not in between
result sets. If a connection is broken and has to be reopened, the reopened connection must
send the header bytes again.

A StreamingProtocol is a system for encoding result sets into a byte stream and decoding
them from a byte stream. It has the methods: 
getTupleEncoder and getTupleDecoder.

A tuple encoder takes result sets and encodes them into a byte stream suitable for streaming.
It also provides a \texttt{getHeader} method to retrieve the 4 byte header which must
begin the streaming message. It encodes complete result sets only via the \texttt{encode} method.




A tuple decoder takes a byte stream and decodes it into result sets. The byte stream is taken
directly from the streaming connection but should \textbf{not} include the 4 byte header.
Since the streamed bytes may arrive in chunks which do not correspond to complete result sets,
the TupleDecoder has two methods: \texttt{pushBytes} to read streaming data, and \texttt{popResults}
to return complete result sets. \texttt{popResults} should normally be called after each call
to \texttt{pushBytes} and may return zero, one or multiple result sets.
Once a chunk of bytes has been pushed to a TupleDecoder it must either decode them into a 
complete result set or store incomplete data internally. The original data passed to \texttt{pushBytes}
may be discarded or overwritten.




\subsection{Producer side}

\subsubsection*{StreamingSender}



The streaming sender is a single thread which iterates over a set of
StreamingSource objects, each of which is associated with a single streaming
connection. For each StreamingSource, it calls \texttt{popBytes}, passing a buffer
to be filled with data to send. The size of this buffer defines the maximum 
amount of data that may be sent with each iteration and is analagous to the
chunk size in the StreamingReceiver (but again is unrelated to the result
set chunk size). The bytes returned are written to the socket connection.

If a socket connection fails, the \texttt{reset} method is called on
the associated StreamingSource. This triggers the StreamingSource to
re-send the header bytes, and possibly re-send bytes corresponding to 
an incomplete result set.

Producers call \texttt{addQuery} to register a new RunningQuery. If 
a StreamingSource object already exists to the target consumer using
the same streaming protocol (new protocol only), the query is added
to this StreamingSource, otherwise a new StreamingSource is created
for this query.

\subsubsection*{StreamingSource}


The StreamingSource is associated with a single streaming connection.
It has one or more RunningQuery objects which it pops results from
to be sent on the streaming connection. It is instantiated with a 
particular StreamingProtocol.

With each call to popBytes, the StreamingSource attempts to fill
the supplied buffer with bytes. If the header bytes have not
yet been sent, these are sent first. Then, the StreamingSource
iterates through its RunningQuery object, popping a result set, 
encoding it to bytes using its TupleEncoder, and writing the bytes
to the send buffer. When the send buffer is full, it is returned
and the encoded bytes from the current result set are saved. The next
time \texttt{popBytes} is called, remaining bytes from this result set
are sent before moving on the the next RunningQuery.

If a RunningQuery returns a result set with the \texttt{endOfResults} flag set,
it is deleted from the list. If the StreamingSource has no RunningQueries,
it returns \texttt{false} to \texttt{popBytes}, indicating to the StreamingSender 
that the connection may be closed. To avoid connections being closed at the
same time as new queries being added, the \texttt{addQuery} and \texttt{popBytes}
methods must be synchronized and the StreamingSource must set an internal flag 
rejecting any further calls to \texttt{addQuery} before returning 
\texttt{false} to \texttt{popBytes}.
 
If the StreamingSender calls \texttt{reset}, the StreamingSource takes the following 
actions:

\begin{itemize}
\item It sets a flag so that the header bytes are sent again on the next call to
      \texttt{popBytes}.
\item It resets the send position on the byte buffer containing the current result set
      so that this result set is sent again from the beginning.
\end{itemize}

This method does not need to be synchronized with \texttt{popBytes}, since both methods
are only ever called by the main thread in the StreamingSender.

\subsubsection*{Running Query}



A Producer Resource (of any type) has a set of Running Query
instances, one for each consumer to which it is currently
streaming. This holds a Tuple Cursor and details of the Consumer
to which it is streaming. It enforces query timeouts irrespective
of whether notification is received from the Consumer. 

A RunningQuery is associated with a single StreamingSource object which
represents the connection (although it may not be the only RunningQuery
using this StreamingSource). It pops tuples from its cursor in response 
to pop requests from the StreamingSource.


\subsection{Consumer side}

\subsubsection*{StreamingReceiver}



The streaming receiver is a singleton which listens on the streaming
port. It receives connections from producers and forwards the associated byte
stream to a StreamingSink object, one of which is created for each connection that
is accepted. The byte stream is forwarded in chunks of a configurable size. This block
size does not affect the parsing of the stream or the chunk size of the results (number
of tuples per result set), it is purely an optimization parameter and is likely to be
fixed at a reasonable value initially.

The StreamingReceiver has a list of RunningReply objects which receive tuples for
consumers. It passes this list to each new StreamingSink so that it can identify
the correct reply for each decoded result set. Consumers register their queries
by calling \texttt{addReply} for each producer they send a query to. When a consumer is
destroyed or wishes to abort a query, it calls \texttt{removeReply} to remove relevant
RunningReply objects from the streaming receiver.

The StreamingReceiver also maintains a list of IP addresses of producers which
it expects to be contacted by. If a new connection is received from a host whose
address is not in this list, the connection is dropped immediately with no response.
This is intended to be a basic security precaution to block attacks from machines which
are not R-GMA servers, however it does not protect against malicious connections from
machines registered in the Registry as genuine R-GMA servers.



\subsubsection*{StreamingSink}



A streaming sink is created for each connection accepted by the StreamingReceiver.
It handles the stream of bytes as they arrive, initially reading the header
and instantiating the appropriate TupleDecoder for the requested streaming protocol.
Once this has been done, it passes the byte stream to the decoder and pops the
decoded result sets. For each result set, it locates the appropriate RunningReply
object and pushes the results on to it.

Location of the appropriate RunningReply is done using the \texttt{matches} method
which tests if a reply is relevant to a query intended for a specific consumer, from
a specific producer in response to a specific query. The first matching RunningReply
found receives the tuples. For connections from older R-GMA servers, the metadata 
included in the result set is limited, in general only the recipient resource ID is 
specified. To accommodate this, Consumers should submit a special RunningReply object
which only matches results with this limited information and will receive all tuples
from old producers using the Classic streaming protocol.

Location of the appropriate RunningReply object must be synchronized with the
\texttt{addReply} method from the StreamingReceiver. This is done by synchronizing
on the list object. This should not be a performance problem since only one
StreamingSink will try to synchronize at a time (because the StreamingReceiver is 
single-threaded), and neither will hold the lock for a long time.

\subsubsection*{Running Reply}

A Consumer Resource has a set of Running Reply instances, one for each
producer from which it is currently streaming. Its purpose is to encapsulate
the consumer's connection to the producer which may be tested (using \texttt{testProducer})
or aborted. It receives result sets from one or more StreamingSink objects and
generally adds them to the consumer's tuple stack, although it may ignore them
if the query has been aborted or timed out.
